{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "e3da9a3f-f583-4ba6-994e-0e8c1158f5eb",
   "metadata": {},
   "source": [
    "# How to create a custom chat model class\n",
    "\n",
    ":::info Prerequisites\n",
    "\n",
    "This guide assumes familiarity with the following concepts:\n",
    "- [Chat models](/docs/concepts/#chat-models)\n",
    "\n",
    ":::\n",
    "\n",
    "In this guide, we'll learn how to create a custom chat model using LangChain abstractions.\n",
    "\n",
    "Wrapping your LLM with the standard [`BaseChatModel`](https://api.python.langchain.com/en/latest/language_models/langchain_core.language_models.chat_models.BaseChatModel.html) interface allow you to use your LLM in existing LangChain programs with minimal code modifications!\n",
    "\n",
    "As an bonus, your LLM will automatically become a LangChain `Runnable` and will benefit from some optimizations out of the box (e.g., batch via a threadpool), async support, the `astream_events` API, etc.\n",
    "\n",
    "## Inputs and outputs\n",
    "\n",
    "First, we need to talk about **messages**, which are the inputs and outputs of chat models.\n",
    "\n",
    "### Messages\n",
    "\n",
    "Chat models take messages as inputs and return a message as output. \n",
    "\n",
    "LangChain has a few [built-in message types](/docs/concepts/#message-types):\n",
    "\n",
    "| Message Type          | Description                                                                                     |\n",
    "|-----------------------|-------------------------------------------------------------------------------------------------|\n",
    "| `SystemMessage`       | Used for priming AI behavior, usually passed in as the first of a sequence of input messages.   |\n",
    "| `HumanMessage`        | Represents a message from a person interacting with the chat model.                             |\n",
    "| `AIMessage`           | Represents a message from the chat model. This can be either text or a request to invoke a tool.|\n",
    "| `FunctionMessage` / `ToolMessage` | Message for passing the results of tool invocation back to the model.               |\n",
    "| `AIMessageChunk` / `HumanMessageChunk` / ... | Chunk variant of each type of message. |\n",
    "\n",
    "\n",
    "::: {.callout-note}\n",
    "`ToolMessage` and `FunctionMessage` closely follow OpenAI's `function` and `tool` roles.\n",
    "\n",
    "This is a rapidly developing field and as more models add function calling capabilities. Expect that there will be additions to this schema.\n",
    ":::"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "c5046e6a-8b09-4a99-b6e6-7a605aac5738",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "from langchain_core.messages import (\n",
    "    AIMessage,\n",
    "    BaseMessage,\n",
    "    FunctionMessage,\n",
    "    HumanMessage,\n",
    "    SystemMessage,\n",
    "    ToolMessage,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "53033447-8260-4f53-bd6f-b2f744e04e75",
   "metadata": {},
   "source": [
    "### Streaming Variant\n",
    "\n",
    "All the chat messages have a streaming variant that contains `Chunk` in the name."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "d4656e9d-bfa1-4703-8f79-762fe6421294",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "from langchain_core.messages import (\n",
    "    AIMessageChunk,\n",
    "    FunctionMessageChunk,\n",
    "    HumanMessageChunk,\n",
    "    SystemMessageChunk,\n",
    "    ToolMessageChunk,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "81ebf3f4-c760-4898-b921-fdb469453d4a",
   "metadata": {},
   "source": [
    "These chunks are used when streaming output from chat models, and they all define an additive property!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "9c15c299-6f8a-49cf-a072-09924fd44396",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "AIMessageChunk(content='Hello World!')"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "AIMessageChunk(content=\"Hello\") + AIMessageChunk(content=\" World!\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "bbfebea1",
   "metadata": {},
   "source": [
    "## Base Chat Model\n",
    "\n",
    "Let's implement a chat model that echoes back the first `n` characters of the last message in the prompt!\n",
    "\n",
    "To do so, we will inherit from `BaseChatModel` and we'll need to implement the following:\n",
    "\n",
    "| Method/Property                    | Description                                                       | Required/Optional  |\n",
    "|------------------------------------|-------------------------------------------------------------------|--------------------|\n",
    "| `_generate`                        | Use to generate a chat result from a prompt                       | Required           |\n",
    "| `_llm_type` (property)             | Used to uniquely identify the type of the model. Used for logging.| Required           |\n",
    "| `_identifying_params` (property)   | Represent model parameterization for tracing purposes.            | Optional           |\n",
    "| `_stream`                          | Use to implement streaming.                                       | Optional           |\n",
    "| `_agenerate`                       | Use to implement a native async method.                           | Optional           |\n",
    "| `_astream`                         | Use to implement async version of `_stream`.                      | Optional           |\n",
    "\n",
    "\n",
    ":::{.callout-tip}\n",
    "The `_astream` implementation uses `run_in_executor` to launch the sync `_stream` in a separate thread if `_stream` is implemented, otherwise it fallsback to use `_agenerate`.\n",
    "\n",
    "You can use this trick if you want to reuse the `_stream` implementation, but if you're able to implement code that's natively async that's a better solution since that code will run with less overhead.\n",
    ":::"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8e7047bd-c235-46f6-85e1-d6d7e0868eb1",
   "metadata": {},
   "source": [
    "### Implementation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "25ba32e5-5a6d-49f4-bb68-911827b84d61",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "from typing import Any, AsyncIterator, Dict, Iterator, List, Optional\n",
    "\n",
    "from langchain_core.callbacks import (\n",
    "    AsyncCallbackManagerForLLMRun,\n",
    "    CallbackManagerForLLMRun,\n",
    ")\n",
    "from langchain_core.language_models import BaseChatModel, SimpleChatModel\n",
    "from langchain_core.messages import AIMessageChunk, BaseMessage, HumanMessage\n",
    "from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult\n",
    "from langchain_core.runnables import run_in_executor\n",
    "\n",
    "\n",
    "class CustomChatModelAdvanced(BaseChatModel):\n",
    "    \"\"\"A custom chat model that echoes the first `n` characters of the input.\n",
    "\n",
    "    When contributing an implementation to LangChain, carefully document\n",
    "    the model including the initialization parameters, include\n",
    "    an example of how to initialize the model and include any relevant\n",
    "    links to the underlying models documentation or API.\n",
    "\n",
    "    Example:\n",
    "\n",
    "        .. code-block:: python\n",
    "\n",
    "            model = CustomChatModel(n=2)\n",
    "            result = model.invoke([HumanMessage(content=\"hello\")])\n",
    "            result = model.batch([[HumanMessage(content=\"hello\")],\n",
    "                                 [HumanMessage(content=\"world\")]])\n",
    "    \"\"\"\n",
    "\n",
    "    model_name: str\n",
    "    \"\"\"The name of the model\"\"\"\n",
    "    n: int\n",
    "    \"\"\"The number of characters from the last message of the prompt to be echoed.\"\"\"\n",
    "\n",
    "    def _generate(\n",
    "        self,\n",
    "        messages: List[BaseMessage],\n",
    "        stop: Optional[List[str]] = None,\n",
    "        run_manager: Optional[CallbackManagerForLLMRun] = None,\n",
    "        **kwargs: Any,\n",
    "    ) -> ChatResult:\n",
    "        \"\"\"Override the _generate method to implement the chat model logic.\n",
    "\n",
    "        This can be a call to an API, a call to a local model, or any other\n",
    "        implementation that generates a response to the input prompt.\n",
    "\n",
    "        Args:\n",
    "            messages: the prompt composed of a list of messages.\n",
    "            stop: a list of strings on which the model should stop generating.\n",
    "                  If generation stops due to a stop token, the stop token itself\n",
    "                  SHOULD BE INCLUDED as part of the output. This is not enforced\n",
    "                  across models right now, but it's a good practice to follow since\n",
    "                  it makes it much easier to parse the output of the model\n",
    "                  downstream and understand why generation stopped.\n",
    "            run_manager: A run manager with callbacks for the LLM.\n",
    "        \"\"\"\n",
    "        # Replace this with actual logic to generate a response from a list\n",
    "        # of messages.\n",
    "        last_message = messages[-1]\n",
    "        tokens = last_message.content[: self.n]\n",
    "        message = AIMessage(\n",
    "            content=tokens,\n",
    "            additional_kwargs={},  # Used to add additional payload (e.g., function calling request)\n",
    "            response_metadata={  # Use for response metadata\n",
    "                \"time_in_seconds\": 3,\n",
    "            },\n",
    "        )\n",
    "        ##\n",
    "\n",
    "        generation = ChatGeneration(message=message)\n",
    "        return ChatResult(generations=[generation])\n",
    "\n",
    "    def _stream(\n",
    "        self,\n",
    "        messages: List[BaseMessage],\n",
    "        stop: Optional[List[str]] = None,\n",
    "        run_manager: Optional[CallbackManagerForLLMRun] = None,\n",
    "        **kwargs: Any,\n",
    "    ) -> Iterator[ChatGenerationChunk]:\n",
    "        \"\"\"Stream the output of the model.\n",
    "\n",
    "        This method should be implemented if the model can generate output\n",
    "        in a streaming fashion. If the model does not support streaming,\n",
    "        do not implement it. In that case streaming requests will be automatically\n",
    "        handled by the _generate method.\n",
    "\n",
    "        Args:\n",
    "            messages: the prompt composed of a list of messages.\n",
    "            stop: a list of strings on which the model should stop generating.\n",
    "                  If generation stops due to a stop token, the stop token itself\n",
    "                  SHOULD BE INCLUDED as part of the output. This is not enforced\n",
    "                  across models right now, but it's a good practice to follow since\n",
    "                  it makes it much easier to parse the output of the model\n",
    "                  downstream and understand why generation stopped.\n",
    "            run_manager: A run manager with callbacks for the LLM.\n",
    "        \"\"\"\n",
    "        last_message = messages[-1]\n",
    "        tokens = last_message.content[: self.n]\n",
    "\n",
    "        for token in tokens:\n",
    "            chunk = ChatGenerationChunk(message=AIMessageChunk(content=token))\n",
    "\n",
    "            if run_manager:\n",
    "                # This is optional in newer versions of LangChain\n",
    "                # The on_llm_new_token will be called automatically\n",
    "                run_manager.on_llm_new_token(token, chunk=chunk)\n",
    "\n",
    "            yield chunk\n",
    "\n",
    "        # Let's add some other information (e.g., response metadata)\n",
    "        chunk = ChatGenerationChunk(\n",
    "            message=AIMessageChunk(content=\"\", response_metadata={\"time_in_sec\": 3})\n",
    "        )\n",
    "        if run_manager:\n",
    "            # This is optional in newer versions of LangChain\n",
    "            # The on_llm_new_token will be called automatically\n",
    "            run_manager.on_llm_new_token(token, chunk=chunk)\n",
    "        yield chunk\n",
    "\n",
    "    @property\n",
    "    def _llm_type(self) -> str:\n",
    "        \"\"\"Get the type of language model used by this chat model.\"\"\"\n",
    "        return \"echoing-chat-model-advanced\"\n",
    "\n",
    "    @property\n",
    "    def _identifying_params(self) -> Dict[str, Any]:\n",
    "        \"\"\"Return a dictionary of identifying parameters.\n",
    "\n",
    "        This information is used by the LangChain callback system, which\n",
    "        is used for tracing purposes make it possible to monitor LLMs.\n",
    "        \"\"\"\n",
    "        return {\n",
    "            # The model name allows users to specify custom token counting\n",
    "            # rules in LLM monitoring applications (e.g., in LangSmith users\n",
    "            # can provide per token pricing for their model and monitor\n",
    "            # costs for the given LLM.)\n",
    "            \"model_name\": self.model_name,\n",
    "        }"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1e9af284-f2d3-44e2-ac6a-09b73d89ada3",
   "metadata": {},
   "source": [
    "### Let's test it 🧪\n",
    "\n",
    "The chat model will implement the standard `Runnable` interface of LangChain which many of the LangChain abstractions support!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "27689f30-dcd2-466b-ba9d-f60b7d434110",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "AIMessage(content='Meo', response_metadata={'time_in_seconds': 3}, id='run-ddb42bd6-4fdd-4bd2-8be5-e11b67d3ac29-0')"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "model = CustomChatModelAdvanced(n=3, model_name=\"my_custom_model\")\n",
    "\n",
    "model.invoke(\n",
    "    [\n",
    "        HumanMessage(content=\"hello!\"),\n",
    "        AIMessage(content=\"Hi there human!\"),\n",
    "        HumanMessage(content=\"Meow!\"),\n",
    "    ]\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "406436df-31bf-466b-9c3d-39db9d6b6407",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "AIMessage(content='hel', response_metadata={'time_in_seconds': 3}, id='run-4d3cc912-44aa-454b-977b-ca02be06c12e-0')"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "model.invoke(\"hello\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "a72ffa46-6004-41ef-bbe4-56fa17a029e2",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[AIMessage(content='hel', response_metadata={'time_in_seconds': 3}, id='run-9620e228-1912-4582-8aa1-176813afec49-0'),\n",
       " AIMessage(content='goo', response_metadata={'time_in_seconds': 3}, id='run-1ce8cdf8-6f75-448e-82f7-1bb4a121df93-0')]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "model.batch([\"hello\", \"goodbye\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "3633be2c-2ea0-42f9-a72f-3b5240690b55",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "c|a|t||"
     ]
    }
   ],
   "source": [
    "for chunk in model.stream(\"cat\"):\n",
    "    print(chunk.content, end=\"|\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3f8a7c42-aec4-4116-adf3-93133d409827",
   "metadata": {},
   "source": [
    "Please see the implementation of `_astream` in the model! If you do not implement it, then no output will stream.!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "b7d73995-eeab-48c6-a7d8-32c98ba29fc2",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "c|a|t||"
     ]
    }
   ],
   "source": [
    "async for chunk in model.astream(\"cat\"):\n",
    "    print(chunk.content, end=\"|\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f80dc55b-d159-4527-9191-407a7c6d6042",
   "metadata": {},
   "source": [
    "Let's try to use the astream events API which will also help double check that all the callbacks were implemented!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "17840eba-8ff4-4e73-8e4f-85f16eb1c9d0",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'event': 'on_chat_model_start', 'run_id': '125a2a16-b9cd-40de-aa08-8aa9180b07d0', 'name': 'CustomChatModelAdvanced', 'tags': [], 'metadata': {}, 'data': {'input': 'cat'}}\n",
      "{'event': 'on_chat_model_stream', 'run_id': '125a2a16-b9cd-40de-aa08-8aa9180b07d0', 'tags': [], 'metadata': {}, 'name': 'CustomChatModelAdvanced', 'data': {'chunk': AIMessageChunk(content='c', id='run-125a2a16-b9cd-40de-aa08-8aa9180b07d0')}}\n",
      "{'event': 'on_chat_model_stream', 'run_id': '125a2a16-b9cd-40de-aa08-8aa9180b07d0', 'tags': [], 'metadata': {}, 'name': 'CustomChatModelAdvanced', 'data': {'chunk': AIMessageChunk(content='a', id='run-125a2a16-b9cd-40de-aa08-8aa9180b07d0')}}\n",
      "{'event': 'on_chat_model_stream', 'run_id': '125a2a16-b9cd-40de-aa08-8aa9180b07d0', 'tags': [], 'metadata': {}, 'name': 'CustomChatModelAdvanced', 'data': {'chunk': AIMessageChunk(content='t', id='run-125a2a16-b9cd-40de-aa08-8aa9180b07d0')}}\n",
      "{'event': 'on_chat_model_stream', 'run_id': '125a2a16-b9cd-40de-aa08-8aa9180b07d0', 'tags': [], 'metadata': {}, 'name': 'CustomChatModelAdvanced', 'data': {'chunk': AIMessageChunk(content='', response_metadata={'time_in_sec': 3}, id='run-125a2a16-b9cd-40de-aa08-8aa9180b07d0')}}\n",
      "{'event': 'on_chat_model_end', 'name': 'CustomChatModelAdvanced', 'run_id': '125a2a16-b9cd-40de-aa08-8aa9180b07d0', 'tags': [], 'metadata': {}, 'data': {'output': AIMessageChunk(content='cat', response_metadata={'time_in_sec': 3}, id='run-125a2a16-b9cd-40de-aa08-8aa9180b07d0')}}\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/eugene/src/langchain/libs/core/langchain_core/_api/beta_decorator.py:87: LangChainBetaWarning: This API is in beta and may change in the future.\n",
      "  warn_beta(\n"
     ]
    }
   ],
   "source": [
    "async for event in model.astream_events(\"cat\", version=\"v1\"):\n",
    "    print(event)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "44ee559b-b1da-4851-8c97-420ab394aff9",
   "metadata": {},
   "source": [
    "## Contributing\n",
    "\n",
    "We appreciate all chat model integration contributions. \n",
    "\n",
    "Here's a checklist to help make sure your contribution gets added to LangChain:\n",
    "\n",
    "Documentation:\n",
    "\n",
    "* The model contains doc-strings for all initialization arguments, as these will be surfaced in the [APIReference](https://api.python.langchain.com/en/stable/langchain_api_reference.html).\n",
    "* The class doc-string for the model contains a link to the model API if the model is powered by a service.\n",
    "\n",
    "Tests:\n",
    "\n",
    "* [ ] Add unit or integration tests to the overridden methods. Verify that `invoke`, `ainvoke`, `batch`, `stream` work if you've over-ridden the corresponding code.\n",
    "\n",
    "\n",
    "Streaming (if you're implementing it):\n",
    "\n",
    "* [ ] Implement the _stream method to get streaming working\n",
    "\n",
    "Stop Token Behavior:\n",
    "\n",
    "* [ ] Stop token should be respected\n",
    "* [ ] Stop token should be INCLUDED as part of the response\n",
    "\n",
    "Secret API Keys:\n",
    "\n",
    "* [ ] If your model connects to an API it will likely accept API keys as part of its initialization. Use Pydantic's `SecretStr` type for secrets, so they don't get accidentally printed out when folks print the model.\n",
    "\n",
    "\n",
    "Identifying Params:\n",
    "\n",
    "* [ ] Include a `model_name` in identifying params\n",
    "\n",
    "\n",
    "Optimizations:\n",
    "\n",
    "Consider providing native async support to reduce the overhead from the model!\n",
    " \n",
    "* [ ] Provided a native async of `_agenerate` (used by `ainvoke`)\n",
    "* [ ] Provided a native async of `_astream` (used by `astream`)\n",
    "\n",
    "## Next steps\n",
    "\n",
    "You've now learned how to create your own custom chat models.\n",
    "\n",
    "Next, check out the other how-to guides chat models in this section, like [how to get a model to return structured output](/docs/how_to/structured_output) or [how to track chat model token usage](/docs/how_to/chat_token_usage_tracking)."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
